Failed to allocate memory within the configured max blocking time

Kafka版本0.9.0.1,发送数据时报错

1
Failed to allocate memory within the configured max blocking time

原因很明显,如果producer端缓存(buffer.memory,默认32M)满了的话,在一定时间(max.block.ms,默认60s)内如果数据无法被放入缓存,则抛出该异常。

发生该异常之后,producer的性能急剧下降,TPS从10000+降低到50.

经过分析,原因是在尝试把数据放入缓存之前(Buffer.allocate),会在Condition队列waiters中插入一条新记录,

1
2
Condition moreMemory = this.lock.newCondition();
this.waiters.addLast(moreMemory);

如果一定时间内没有能放到缓存中,则直接抛出异常,moreMemory并没有从waiters中移除。

而producer的缓存刷新的逻辑(RecordAccumulator.ready)如下:

1
2
3
boolean exhausted = this.free.queued() > 0;
……
boolean sendable = full || expired || exhausted || closed || flushInProgress();

其中free.queued() 的返回值就是waiters.size(),因此如果抛出本文开头所说的异常,那么之后每发一条数据,sendable都会为true,所以每发一条数据都会刷新一次缓存,相当于是同步发送,效率非常差。

好在在0.10中已经修复了这个bug,在抛出异常之前,先调用

1
this.waiters.remove(moreMemory);

JIRA

https://issues.apache.org/jira/browse/KAFKA-3651